package com.xlz.manager;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.xlz.domain.App;
import com.xlz.domain.AppWorker;
import com.xlz.domain.AppTask;
import com.xlz.service.AppInstanceService;
import com.xlz.service.AppService;
import com.xlz.service.AppWorkerService;
import com.xlz.service.AppTaskService;
import com.xlz.service.StatisticExecuteService;
import com.xlz.service.StatisticSelectService;
import com.xlz.statistics.StatisticTask;
import com.xlz.timer.CleanTask;
import com.xlz.timer.HeartBeatTask;
import com.xlz.timer.WorkerTask;
import com.xlz.timer.MasterTask;
import com.xlz.util.Common;
import com.xlz.util.DateUtil;
import com.xlz.worker.Worker;

/**
 * 默认分布式调度入口管理类.
 * @author 张蕾蕾
 * @date 2018 03 24
 */
public class DefaultDstManager {

	protected final Logger LOG = LoggerFactory.getLogger(getClass());

	private Map<Long, Worker> workerPool = new ConcurrentHashMap<Long, Worker>();
	// 非task的timer管理
	private Map<Long, Timer> timerPool = new ConcurrentHashMap<Long, Timer>();

	// 采用的是spring还是JavaApplication
	private String container = Common.RUNNTIME_CONTAINER_APPLICATION;

	private DstContext context = new DstContext();

	private DataSource dataSource;
	private AppInstanceService appInstanceService;
	private AppWorkerService appWorkerService;
	private AppService appService;
	private AppTaskService appTaskService;
	private StatisticSelectService statisticSelectService;
	private StatisticExecuteService statisticExecuteService;

	private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

	public DefaultDstManager() {
	}

	private void init() {
		appInstanceService = new AppInstanceService(dataSource);
		appWorkerService = new AppWorkerService(dataSource);
		appService = new AppService(dataSource);
		appTaskService = new AppTaskService(dataSource);
		statisticSelectService = new StatisticSelectService(dataSource);
		statisticExecuteService = new StatisticExecuteService(dataSource);
		//加载应用配置
		loadAppConfig();
	}

	public synchronized void start() throws IOException {
		// 初始化jdbc操作类
		init();
		LOG.info("启动任务调度，当前id:{}", context.getInstanceId());
		// 注册本机到数据库，同时需要 更新本机心跳,如果是新注册，需要更新本机拥有的任务为为准备就绪
		Timer heartBeatTimer = new Timer(true);
		heartBeatTimer.schedule(new HeartBeatTask(this, heartBeatTimer), 0);

		// 进行选主，主负责分配任务项（分片）
		// 分配完成之后将任务设置为可用,此时所有机器才可以获取任务进行初始化
		Timer refreshTasks = new Timer(true);
		refreshTasks.schedule(new MasterTask(this,refreshTasks), 10);
		
		// 定时清除过期数据(20s没有心跳的数据)
		service.scheduleWithFixedDelay(new CleanTask(this), 0, 30, TimeUnit.SECONDS);

		// 发送统计信息
		service.scheduleWithFixedDelay(new StatisticTask(this), 10, 10, TimeUnit.SECONDS);

	}

	/**
	 * 根据数据库中对task的配置进行本机task的刷新
	 */
	public void refreshTasks() {
		LOG.debug(
				"刷新任务应用任务，当前时间：" + DateUtil.parseDateToStr(new Date(), DateUtil.DATE_TIME_FORMAT_YYYY_MM_DD_HH_MI_SS));
		try {
			// 连接数据，获取app_task
			List<AppTask> tasks = appTaskService.getAllByAppNo(context.getAppNo());
			boolean diff = context.setInstanceUUIDs(appInstanceService.getAllByAppNo(context.getAppNo()));
			// 有效任务为0或者同一appNo下的机器是否有所增减（master内存中的机器缓存和数据库总的对比）
			if (tasks.size() == 0 || diff) {
				LOG.debug("停止所有任务，当前获取到的任务：{}" , tasks);
				// 停止所有任务
				long start = System.currentTimeMillis();
				destroy();
				long end = System.currentTimeMillis();
				LOG.debug("停止所有任务成功，耗时{}ms",(end-start));
			}
			// 未获取到机器列表，不能继续
			else {
				long start = System.currentTimeMillis();
				initTaskWorker(tasks);
				long end = System.currentTimeMillis();
				LOG.debug("初始化所有任务成功，耗时{}ms",(end-start));
			}
		} catch (Exception e) {
			LOG.error("刷新任务异常：", e);
			context.clearInstanceUUIDs();
		}

	}

	private void initTaskWorker(List<AppTask> tasks) {
		// 不存在差集，不必全部重新分配worker，需要判断是否存在单个task的改动，如果有改动需要重新分配worker
		for (AppTask task : tasks) {
			AppTask appTask = appTaskService.getByID(task.getId());
			LOG.debug("任务初始化：" + tasks);
			if (appTask.getTaskGroupCount() == null || appTask.getTaskGroupCount().intValue() == 0) {
				appTask.setTaskGroupCount(1);
			}
			// 判读任务是否可用
			if (appTask.getActive().intValue() == 1) {
				// 判断任务是否准备就绪
				if (appTask.getReadiness().intValue() == 0) {
					assignSingleTaskWorker(appTask);
				}
				// 获取task信息
				appTask = appTaskService.getByID(task.getId());
				if (appTask.getReadiness().intValue() == 1) {
					// 任务已经完成自动分片，准备就绪,开始创建调度任务
					if (appTask != null && appTask.getReadiness().intValue() == 1) {
						// 需要根据appNo获取分配到的任务,进行调度初始化
						List<AppWorker> workerList = appWorkerService
								.getAllByTaskId(appTask.getId(), context.getInstanceId());
						LOG.debug("获取到当前机器获取到的分片，当前任务{}，进行任务初始化{}", appTask, workerList);
						initTaskJob(appTask, workerList);
					} else {
						LOG.debug("任务状态为未准备就绪，需要停止当前任务{}", appTask);
						// 停止任务
						stopSingleTaskJobs(appTask);
					}
				} else {
					LOG.debug("任务状态为未准备就绪，需要停止当前任务{}", appTask);
					// 停止任务
					stopSingleTaskJobs(appTask);
				}
			} else {
				LOG.debug("任务状态为未激活，需要停止当前任务{}", tasks);
				// 停止任务
				stopSingleTaskJobs(task);
			}
		}

	}

		/**
	 * 对单个任务进行worker分配
	 * 
	 * @param task
	 */
	private void assignSingleTaskWorker(AppTask task) {
		LOG.debug("获取当前分片信息{}", task);
		// 查询是否有工作组为完成处理，停止工作
		int count = appWorkerService.findCount(task);
		if (count != 0) {
			LOG.debug("等待当前任务{}所有的工作组都停止后在进行重新分片", task);
			return;
		}
		List<String> bindIps = Arrays.asList(task.getBindIp().split(","));
		// 如果是master则进行重新分配任务项
		List<AppWorker> workerList = new ArrayList<AppWorker>();
		if (context.isMaster()) {
			LOG.debug("Master准备对当前任务进行分片：" + task);
			String jobItems[] = task.getShardItem().split("\\|");
			// 确定要处理任务的工作组
			for (int i = 0; i < task.getTaskGroupCount().intValue(); i++) {
				for (String uuid : context.getInstanceUUIDs()) {
					String ip = uuid.substring(0, uuid.indexOf("$"));
					String workerName = appendTaskPrefix(uuid, task.getId().intValue() + "_" + i);
					// 保定的ip才生效
					if (task.getBindIp().equals("127.0.0.1") || bindIps.contains(ip)) {
						AppWorker worker = new AppWorker();
						worker.setRegisterWorker(workerName);
						worker.setRegisterInstanceId(uuid);
						worker.setTaskId(task.getId());
						worker.setActive(1);
						worker.setAppNo(context.getAppNo());
						workerList.add(worker);
					}
				}
			}
			if(workerList.size() > 0){
				// 将任务项 进行重新分配，并且以同一事物更新进数据库app_task_instance的task_item中
				for (int i = 0; i < jobItems.length; i++) {
					// 任务项归属
					int ascription = i % workerList.size();
					AppWorker worker = workerList.get(ascription);
					worker.setShardItem(
							worker.getShardItem() == null ? jobItems[i] : worker.getShardItem() + "|" + jobItems[i]);
				}
	
				// 同一事物更新到数据库，并且将任务设置为准备就绪（此时其他机器才可以获取自己的任务项进行调度）
				appWorkerService.insertBatch(workerList);
				try {
					appTaskService.updateReadinessById(task.getId(), 1);
					LOG.debug("Master完成对当前任务进行分片{}", task);
				} catch (SQLException e) {
					LOG.error("设置当前任务状态为准备就绪异常，{}", task, e);
				}
			}
		}
	}

	private synchronized void initTaskJob(AppTask task, List<AppWorker> workerList) {
		for (AppWorker job : workerList) {
			try {
				Worker worker = workerPool.get(job.getId());
				if (worker != null) {
					if(worker.isRunning()){
						AppTask currTask = workerPool.get(job.getId()).getAppTask();
						currTask.setActive(task.getActive());
						currTask.setReadiness(task.getReadiness());
						currTask.setExecuteMethod(task.getExecuteMethod());
						continue;
					}else if(!worker.isStoped()){
						continue;
					}
				}
				Timer timer = new Timer();
				WorkerTask ischedultRunTask = new WorkerTask(null, timer, this);
				worker = ischedultRunTask.init(job, task);
				worker.setRegisterWorker(job.getRegisterWorker());
				workerPool.put(job.getId(), worker);
				timerPool.put(job.getId(), timer);
			} catch (Exception e) {
				LOG.error("初始化工作组异常,", job, e);
			}
		}
	}

	/**
	 * 停止所有的任务
	 */
	private void destroy() {
		try {
			// 将任务设置为没有准备就绪
			if (context.isMaster()) {
				LOG.info("终止当前应用所有的调度");
				// 修改任务未准备就绪
				appTaskService.updateActiveByAppNo(context.getAppNo(), 0);
			}
			stopAllJobs();
		} catch (SQLException e) {
			LOG.error("停止应用任务设置数据库状态异常,{}", context.getAppNo(), e);
		}
	}
	
	public void stopAllJobs(){
		// 停止所有机器当前task的所有Job
		if(!workerPool.isEmpty()){
			for (Map.Entry<Long, Worker> entry : workerPool.entrySet()) {
				try {
					stopAndRemove(entry.getKey(), entry.getValue());
				} catch (SQLException e) {
					LOG.error("停止单个任务设置数据库状态异常,{}", entry.getKey(), e);
				}
			}
		}
	}
	
	/**
	 * 停止单个任务的所有Job
	 */
	public void stopSingleTaskJobs(AppTask task) {
		LOG.debug("终止当前应用的单个任务调度,任务：{}", task);
		try {
			if (context.isMaster()) {
				// 修改任务未准备就绪
				if(task.getReadiness() == 1){
					appTaskService.updateReadinessById(task.getId(), 0);
				}
				// 修改状态为不可用
				deleteByTaskId(task);
			}
			// 停止所有机器当前task的所有Job
			for (Map.Entry<Long, Worker> entry : workerPool.entrySet()) {
				if (entry.getValue().getAppTask().getId().longValue() == task.getId().longValue()) {
					stopAndRemove(entry.getKey(), entry.getValue());
				}
			}
		} catch (SQLException e) {
			LOG.error("停止单个任务设置数据库状态异常,{}", task, e);
		}
	}

	public boolean deleteByTaskId(AppTask task) throws SQLException {
		String sql = "select id,task_id,register_instance_id,register_worker,create_time,shard_item,load_time,execute_time,load_total,wait_deal_total "
				+ 	 "from app_worker "
				+ 	 "where active=1 and task_id=" + task.getId()+ " and update_time < date_add(now(), interval -"+task.getDeadTime()+" second)";
		List<AppWorker> list = appWorkerService.findAll(sql, AppWorker.class);
		for(AppWorker entity : list){
			boolean result = appWorkerService.deleteById(entity.getId());
			if(result){
				workerPool.remove(entity.getId());
			}
		}
		return true;
	}
	
	private void stopAndRemove(Long workerId, Worker worker) throws SQLException {
		Timer timer = timerPool.remove(workerId);
		try {
			if (timer != null) {
				timer.cancel();
				timer = null;
			}
		} catch (Exception e) {

		}
		try {
			if (worker != null) {
				worker.getAppTask().setReadiness(0);
				worker.flushStatistic();
				worker.shutdown();
			}
		} catch (Exception e) {
			LOG.error("终止当前工作组，任务组{}", workerId);
			if (worker != null) {
				LOG.error("终止当前工作组，所属任务为{}", worker.getAppTask());
			}
		}
		if (worker != null && worker.isStoped()) {
			appWorkerService.deleteById(workerId);
			workerPool.remove(workerId);
		}
	}

	public long loadAppConfig(){
		long heartBeatRate = 3000l;
		//查询app表，获取心跳时间
		App app = appService.getByAppNo(getContext().getAppNo());
		if(app != null && app.getHeartBeatRate() != null){
			heartBeatRate = app.getHeartBeatRate();
			long time = app.getCurrentTime().getTime() - System.currentTimeMillis();
			if(Math.abs(time) > DateUtil.timeConvert( heartBeatRate) ){
				LOG.error("当前机器的时间和数据库服务器的时差为{}，请配置时间同步!",Math.abs(time));
			}
			
			if(app.getHeartBeatRate() != null){
				getContext().setHeartBeatRate(heartBeatRate  );
			}
			if(app.getDeadHeartCount() != null || app.getDeadHeartCount() > 5){
				this.getContext().setDeadHeartCount(app.getDeadHeartCount());
			}
		}
		return heartBeatRate;
	}
	
	private String appendTaskPrefix(String id, String key) {
		return context.getAppNo() + "$" + id + "$" + key;
	}

	// 启东市配置的参数

	public DataSource getDataSource() {
		return dataSource;
	}

	public void setDataSource(DataSource dataSource) {
		this.dataSource = dataSource;
	}

	public String getContainer() {
		return container;
	}

	public void setContainer(String container) {
		this.container = container;
	}

	////////////////////////////
	public AppInstanceService getAppInstanceService() {
		return appInstanceService;
	}

	public Map<Long, Worker> getWorkerPool() {
		return workerPool;
	}

	public AppWorkerService getAppWorkerService() {
		return appWorkerService;
	}

	public AppService getAppService() {
		return appService;
	}

	public StatisticSelectService getStatisticSelectService() {
		return statisticSelectService;
	}

	public StatisticExecuteService getStatisticExecuteService() {
		return statisticExecuteService;
	}

	public DstContext getContext() {
		return context;
	}
}
